跳到主要内容

JS stdout大数据量连续写入管道导致堵塞问题的解决方案

· 阅读需 4 分钟

问题背景:在一次将客户服务器的 SAP hana 数据库的数据通过 ETL 工具转移到 clickhouse 数仓时,发现本应是 600 多万行的数据,只有十几万到了数仓,且后台和数据库日志中没有任何报错信息。

目前项目中的 etl 工具是singer.io。框架是 tap 和 target,分别代表数据源和数据目标池。在这个例子中,tap 是一个自定义的 nodejs 脚本,实现从客户服务器的 hana 数据库获得数据,通过 linux 的管道传到 target(clickhouse)。管道的输入端就是 stdout。

如果将 etl 抽取的数据范围缩小,比如limit 100000, 最终落到 ck 中的数据是正常的。说明,数据量是问题的关键。多测试几次发现,数据量可能会在一个 160 多万的数字上下飘动。继续实验,我们模拟一个比较长的 js 对象字符串,console.log 打印出来,并把打印输出通过管道送到一个 csv 中,重复 600 万次。代码如下:

for (let i = 0; i < 6000000; i++) {
console.log(JSON.stringify(bigObject));
}

然后执行命令:node xx.js | data.txt。检查data.txt行数,果然少于 600 万行。这里可以推断出来,是 js 脚本把 buffer 资源都吃光了,并在 buffer 已满时直接退出,且不返回任何报错或警告。于是,我们经过查阅资料,找到了用process.stdout.write()方法来替代console.log。因为process.stdout.write()可以接收一个回调函数作为参数,作为数据确认输出后的处理函数。所以,我们把上述代码改造为:

const writeAsync = (line) => {
return new Promise((resolve, reject) => {
process.stdout.write(`${line}\n`, () => {
resolve("ok");
});
});
};

(async () => {
for (let i = 0; i < 6000000; i++) {
await writeAsync(JSON.stringify(bigObject));
}
})();

用异步的逻辑,保证上一条数据成功输出后,才执行下一次输出。如果不成功,就会一直等待,直到 buffer 有新的剩余空间。这次执行完,data.txt行数果然来到了 600 万,这说明我们的推断是正确的。然后,给 hana 数据库抽取脚本改造成上述逻辑,核心部分代码:

client.execute(sql, (err, rs) => {
if (err) {
client.end();
throw new Error(err);
}

rs.setFetchSize(2048);
rs.createObjectStream()
.on("data", async (data) => {
const recordMessage = {
type: "RECORD",
stream: schemaID,
record: data,
};
await writeAsync(JSON.stringify(recordMessage));
})
.on("finish", function () {
if (!rs.closed) {
rs.close();
}
client.end(() => {
process.exit(0); // 确保退出
});
});
});

采用流的传输方式,避免数据堆积在有限的内存中。